package com.computer.fundamentals.concurrentprogram;

import com.computer.util.Constant;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * 阻塞队列的作用：
 *      阻塞队列（BlockingQueue）是一个支持两个附加操作的队列。
 *      这两个附加的操作是：
 *          1. 在队列为空时，获取元素的线程会等待队列变为非空。
 *          2. 当队列满时，存储元素的线程会等待队列可用。
 *      阻塞队列常用于生产者和消费者的场景，生产者是往队列里添加元素的线程，消费者是从队列里拿元素的线程。
 *      阻塞队列就是生产者存放元素的容器，而消费者也只从容器里拿元素。
 */
public class BlockingQueue {

    public Object[] items; // 存储元素

    public int capacity; // 队列容量

    public int insertIndex; // 元素插入位置

    public int removeIndex; // 元素删除位置

    public int size; // 当前元素数量

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
        this.items = new Object[capacity];
        this.insertIndex = 0;
        this.removeIndex = 0;
        this.size = 0;
    }

    /**
     * 元素入队
     *
     * 慢速版问题
     *      Thread.sleep(200)这段代码会让put()和remove()方法分别在队列已满和队列为空的情况下进入一次固定的200毫秒的休眠
     *      防止线程占用过多的CPU资源。但是如果队列在这200毫秒里发生了变化，那么线程也还是在休眠状态无法马上对变化做出响应。
     *      比如如果一个调用put()方法的线程因为队列已满而进入了200毫秒的休眠，那么即使队列已经被消费者线程清空了，
     *      它也仍然会忠实地等到200毫秒之后才会重新尝试向队列中插入元素，中间的这些时间就都被浪费了。
     */
    public void put(Object object) throws InterruptedException {
        // 慢速版
//        while (true) {
//            synchronized (this) {
//                if (size != capacity) {
//                    items[insertIndex] = object;
//                    if (insertIndex + 1 == capacity) {
//                        insertIndex = 0;
//                    }else {
//                        insertIndex++;
//                    }
//                    size++;
//                    break;
//                }
//            }
//            Thread.sleep(200);
//        }

        // 快速版
        synchronized (this) {
            while (size == capacity) {
                this.wait();
            }
            items[insertIndex] = object;
            if (++insertIndex == capacity) {
                insertIndex = 0;
            }
            size++;

            this.notifyAll();
        }
    }

    /**
     * 元素出队
     */
    public Object remove() throws InterruptedException {
        // 慢速版
//        while (true) {
//            synchronized (this) {
//                if (size != 0) {
//                    Object object = items[removeIndex];
//                    items[removeIndex] = null;
//                    if (removeIndex + 1 == capacity) {
//                        removeIndex = 0;
//                    }else {
//                        removeIndex++;
//                    }
//                    size--;
//                    return object;
//                }
//            }
//            Thread.sleep(200);
//        }

        // 快速版
        synchronized (this) {
            while (size == 0) {
                this.wait();
            }
            Object object = items[removeIndex];
            items[removeIndex] = null;
            if (++removeIndex == capacity) {
                removeIndex = 0;
            }
            size--;
            this.notifyAll();
            return object;
        }
    }

    /**
     * 测试
     */
    public static void main(String[] args) {

        // 创建一个容量为2的阻塞队列
        final BlockingQueue blockingQueue = new BlockingQueue(2);

        // 创建2个线程，每个线程循环执行10次
        final int threads = 2;
        final int times = 10;

        // 创建线程列表
        List<Thread> threadList = new ArrayList<>(threads * 2);
        Random randomFactory = new Random(Constant.SEED);
        long startTimes = System.currentTimeMillis();

        // 创建生产者线程
        for (int i = 0;i < threads;i++) {
            Thread producer = new Thread(() -> {
                try {
                    for (int j = 0;j < times;j++) {
                        int num = randomFactory.nextInt(times);
                        blockingQueue.put(num);
                        System.out.println(Thread.currentThread().getName() + "插入：" + num);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            threadList.add(producer);
            producer.start();
        }

        // 创建消费者线程
        for (int i = 0;i < threads;i++) {
            Thread consumer = new Thread(() -> {
                try {
                    for (int j = 0;j < times;j++) {
                        int remove = (int) blockingQueue.remove();
                        System.out.println(Thread.currentThread().getName() + "取出：" + remove);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            threadList.add(consumer);
            consumer.start();
        }

        // 等待线程执行结束
        for (Thread thread : threadList) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long endTimes = System.currentTimeMillis();
        System.out.println("总耗时： " + (endTimes - startTimes));
    }
}